package consumer;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple1;
import scala.Tuple2;
import testjson.phoenix;

public class Sparkkafka {




    public static void SparkStreamingReadProduct() throws InterruptedException {

        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "192.168.0.14:9092,192.168.0.15:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("maxwell");

        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        //  stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));


        JavaPairDStream<String, String> javaPairDStream = stream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>(){
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                return new Tuple2<>(consumerRecord.key(), consumerRecord.value());
            }
        });


        final phoenix phoenixs = new phoenix();
        javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String,String>>() {
            @Override
            public void call(JavaPairRDD<String, String> javaPairRDD) throws Exception {
                // TODO Auto-generated method stub
                javaPairRDD.foreach(new VoidFunction<Tuple2<String,String>>() {
                    @Override
                    public void call(Tuple2<String, String> tuple2)
                            throws Exception {
                        // TODO Auto-generated method stub
                        System.out.println(tuple2._2+"------------------------");

                        phoenixs.JSONphoenix(tuple2._2);
                        System.out.println("++++++++++++++++++++++++++++++++++++++++++");
                    }
                });
            }
        });



        jssc.start();
        jssc.awaitTermination();







    }



    public static void main(String[] args) throws InterruptedException {

        SparkStreamingReadProduct();
    }



}
